Skip to content

Conversation

@ojasvajain
Copy link
Member

@ojasvajain ojasvajain commented Nov 13, 2025

Overview

  1. Fixes issue AdminClient is stuck at CONNECT when using OAUTHBEARER with SASL_SSL protocol #1713 by delegating OAuth token refresh events to be handled by the background thread.
  2. This has been implemented for Admin, Producer and Consumer clients to make behaviour consistent with the Java client.

Note that, poll() will not trigger the OAuth refresh callbacks anymore.

Problem

When providing sasl.mechanisms as OAUTHBEARER and a oauth_cb in the conf, users were unable to make API calls like list_topics(), create_topics() unless poll was called.

>>> from confluent_kafka.admin import AdminClient, NewTopic
>>> from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
>>> import socket

>>> def oauth_cb(oauth_config):
...     auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("region")
...     return auth_token, expiry_ms/1000

>>> config = {
...     'bootstrap.servers': 'xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098',
...     'security.protocol': 'SASL_SSL',
...     'sasl.mechanism': 'OAUTHBEARER',
...     'oauth_cb': oauth_cb,
... }

>>> admin = AdminClient(config)
>>> admin.list_topics(timeout=2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv3/lib/python3.8/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

(Snippet Source: #1713 (comment))

This happens because OAuth token refresh events get pushed to the main queue which is listened by poll(). These events are responsible for triggering the configured OAuth callback function and setting the token in the request. If poll() is not called before list_topics(), the token would not be set in the request, causing the request to fail.

Solution

Changes have been made to handle these refresh events in the background thread using the following librdkafka method calls:

  1. rd_kafka_conf_enable_sasl_queue(conf, 1) - ensures all refresh events get pushed to the SASL queue instead of main queue

  2. rd_kafka_sasl_background_callbacks_enable(self->rk) - ensures all events in SASL queue are forwarded to the background queue.

By using these 2 method calls, we have been able to ensure that OAuth callbacks get triggered from the background thread instead of poll()

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@ojasvajain ojasvajain changed the title Handle OAuth Token Refreshes Using Background Thread Handle OAuth Token Refreshes Using Background Thread For Admin, Producer and Consumer Clients Nov 14, 2025
@ojasvajain ojasvajain marked this pull request as ready for review November 14, 2025 08:05
@ojasvajain ojasvajain requested review from a team and MSeal as code owners November 14, 2025 08:05
Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to check on expected timeout here and where we picked the value first, but otherwise lgtm

if (!h->oauth_cb)
return 0;

int max_wait_sec = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did we pick this timeout? I'm not sure what the longest oauth wait time will be. Maybe this should be a bit longer in case the loopback is slow?

Copy link
Member Author

@ojasvajain ojasvajain Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason to keep it 5 sec. It should be fine to increase it (say, to 10s) as this init is a one time thing and it will help reduce flakiness for clients with slower callback functions. For clients with faster callbacks, this shouldn't be a problem as we are breaking out of the loop if callback succeeds early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased to 10s

@sonarqube-confluent
Copy link

Quality Gate failed Quality Gate failed

Failed conditions
0.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants